Análisis de cambio en temperaturas a nivel global por ciudades

Versión preliminar

Descripción

En este trabajo se utilizan dos conjuntos de datos para hacer un análisis del cambio de temperatura a nivel global por ciudades. El primer conjunto de datos tiene datos de temperatura desde 1975 por ciudades del mundo y el segundo conjunto de datos son datos adicionales de ciudades que incluyen su elevación en metros sobre el nivel del mar.

El objetivo es hacer el cruce de dos conjuntos de datos para ver la relación entre la variación de temperatura y las ciudades según su altitud.

Obtendremos gráficas representativas de variables como la relación entre incremento de temperatura y tiempo, Países con mayor incremento de temperatura, relación entre el incremento de la temperatura y la altitud de las ciudades.

También se utilizará la regresión lineal para obtener un modelo de predicción de la variación de temperatura para los siguientes 50 años, también un modelo de predicción similar pero relacionado a la relación entre el incremento de temperatura y la altitud de las ciudades.

Como se trata de un conjunto de datos grande se aprovechará una infraestructura de clúster spark con un maestro y dos workers para ejecutar tareas distribuidas, las tareas que se ejecutarán de forma distribuida son:

  • Cargado de los conjuntos de datos en dataframes spark.
  • Tratamiento de datos nulos evaluando si aplicar eliminación o alguna técnica para compensar la falta de datos.
  • Mezclado de ambos dataframes en uno solo.
  • Aplicación de funciones UDF para ayudar con las tareas de limpieza y procesamiento.
  • Aplicación de la técnica de regresión lineal simple o regresión lineal polinomial según corresponda para obtener el modelo de predicción.
  • Guardado del dataframe procesado en archivos parquet.
In [1]:
import os
memory = '6g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args
In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MLApp2").getOrCreate()
from pyspark.ml.linalg import Vectors
import pandas as pd
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType
In [3]:
sc = spark.sparkContext 
In [4]:
!ls
cities-altitude-sparql.csv    elevationsByCity.csv
eda-1000000_v2.ipynb	      GlobalLandTemperaturesByCity.1000000.csv
eda-1000000_v3-Copy1.ipynb    GlobalLandTemperaturesByCity-2M.csv
eda-1000000_v3.ipynb	      GlobalLandTemperaturesByCity.csv
eda-v1.ipynb		      MPIData_augmented.csv
eda_v3-4M.ipynb		      spark-warehouse
eda_v3.ipynb		      temp-plot.html
elevationsByCity.1000000.csv  Untitled.ipynb
elevationsByCity-2M.csv
In [5]:
#dtf1 = spark.read.csv('hdfs:///datasets/GlobalLandTemperaturesByCity.1000000.csv',
dtf1 = spark.read.csv('./GlobalLandTemperaturesByCity-2M.csv',
                       inferSchema=True, 
                       header=True)
In [6]:
dtf1.show(2)
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 2 rows

In [7]:
type(dtf1)
Out[7]:
pyspark.sql.dataframe.DataFrame
In [8]:
print(dtf1.count())
1999999

EDA

Dataset Temperatura Promedio por ciudades

Este dataset contiene registros de temperatura promedio por día en ciudades del mundo, también tiene datos de de psocionamiento geográficos como latitud y longitud.

Se empieza analizando la ausencia de valores, haciendo ajustes en los tipos de datos esto por que algunas operaciones posteriores requieren que las variables estén en un tipo de dato estándar de spark.

In [9]:
dtf1.printSchema()
root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

In [10]:
# Paises
dtf1.createOrReplaceTempView('dtf_temp')
query = 'SELECT count(DISTINCT Country) FROM dtf_temp'
djtf = spark.sql(query)
djtf.explain()
djtf.show()
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(distinct Country#14)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(distinct Country#14)])
      +- *(2) HashAggregate(keys=[Country#14], functions=[])
         +- Exchange hashpartitioning(Country#14, 200)
            +- *(1) HashAggregate(keys=[Country#14], functions=[])
               +- *(1) FileScan csv [Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/jovyan/proy/GlobalLandTemperaturesByCity-2M.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Country:string>
+-----------------------+
|count(DISTINCT Country)|
+-----------------------+
|                    106|
+-----------------------+

In [11]:
# Comprobando nulos en AverageTemperature
from pyspark.sql.functions import isnan, when, count, col
dtf1.where(col('AverageTemperature').isNull()).count()
Out[11]:
92521
In [12]:
import pyspark.sql.functions as f
dtf1.where(col('AverageTemperature').isNull() & (col('Country') == 'Denmark')).show(5)
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-08-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows

Como existen varios registros con ausencia de valor y existe un dataset con las elevaciones por latitud y longitud que tiene la misma cantidad de filas, para evitar inconsistencias se unirán ambos datasets y posteriormente se procederá a hacer el tratamiento de valores ausentes.

In [13]:
# convirtiendo a cadena la columna timestamps
from pyspark.sql.types import IntegerType, StringType, DoubleType
from pyspark.sql.functions import udf

# registrando UDF para conversion
def timestampsToString(dt):
    return str(dt) + ''

timestampToString_udf = udf(lambda z: timestampsToString(z), StringType())
In [14]:
dtf1.select('dt', timestampToString_udf('dt').alias('dt_str')).show(5)
+-------------------+-------------------+
|                 dt|             dt_str|
+-------------------+-------------------+
|1743-11-01 00:00:00|1743-11-01 00:00:00|
|1743-12-01 00:00:00|1743-12-01 00:00:00|
|1744-01-01 00:00:00|1744-01-01 00:00:00|
|1744-02-01 00:00:00|1744-02-01 00:00:00|
|1744-03-01 00:00:00|1744-03-01 00:00:00|
+-------------------+-------------------+
only showing top 5 rows

In [15]:
# aplicando udf y reemplazando columna a tipo de dato String
dtf1 = dtf1.withColumn('dt', timestampToString_udf('dt'))
dtf1.printSchema()
root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

Dataset Elevaciones (metros sobre el nivel del mar)

El dataset 2 contiene datos de elevación (altitud sobre el nivel del mar) de los puntos cardinales de las ciudades del dataset 1.

Se ha extraído de la base de datos http://srtm.csi.cgiar.org usando la herramienta https://github.com/Jorl17/open-elevation con el script elevation-getter.py que consulta a un servicios web de una instancia de open-elevation, sen envían como parámetros los puntos cardinales (latitud, longitud) de cada registro del dataset 1 y se obtiene la elevación en metros sobre el nivel del mar.

In [16]:
## Cargando dataset elevationsByCity.csv
#dtf2 = spark.read.csv('hdfs:///datasets/elevationsByCity.1000000.csv',
dtf2 = spark.read.csv('./elevationsByCity-2M.csv',
                       inferSchema=True, 
                       header=True)
In [17]:
dtf2.printSchema()
root
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)

In [18]:
dtf2Len = dtf2.count()
print(dtf2Len)
1999999
In [19]:
# valores marcados como erroneos
missingElevation = dtf2.filter(dtf2.Elevation == -5555)
missingElevation.show()
+--------+---------+---------+
|Latitude|Longitude|Elevation|
+--------+---------+---------+
|  21.70N|   77.02E|    -5555|
|  36.17N|  139.23E|    -5555|
|  50.63N|   13.94E|    -5555|
+--------+---------+---------+

In [20]:
# teniendo las latitudes y longitudes con elevacion errónea, la forma de corregir es
# copiando la elevación de un punto que tenga las mismas coordenadas geográficas.
lMissing = missingElevation.collect()
In [21]:
elevations = {}
for lm in lMissing:
    temp = dtf2.filter((dtf2.Latitude == lm["Latitude"]) & (dtf2.Longitude == lm["Longitude"])).head(5)
    elevation = 0
    for i in range(0,5):
        if temp[i][2] != -5555:
            elevation = temp[i][2]
            break
    elevations[temp[1][0]] = elevation
    # elevations.append((temp[1][0], temp[1][1], elevation))
print(elevations)
{'21.70N': 336, '36.17N': 77, '50.63N': 285}
In [22]:
def replaceMissingElevation(latitude, elevation):
    if elevation == -5555:
        return elevations[latitude]
    return elevation
replaceMissingElevation_udf = udf(replaceMissingElevation, IntegerType())
In [23]:
# aplicando correccion
dfe = dtf2.withColumn("Elevation", replaceMissingElevation_udf(dtf2['Latitude'], dtf2['Elevation']))
dfe.show(3)
print(dfe.count())
+--------+---------+---------+
|Latitude|Longitude|Elevation|
+--------+---------+---------+
|  57.05N|   10.33E|        8|
|  57.05N|   10.33E|        8|
|  57.05N|   10.33E|        8|
+--------+---------+---------+
only showing top 3 rows

1999999
In [24]:
# comprobando
print("total registros:", dfe.count())
print("Registros erróneos:", dfe.filter(dfe.Elevation == -5555).count())
total registros: 1999999
Registros erróneos: 0
In [25]:
dfe.printSchema()
root
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)

In [26]:
dfe.filter(dfe.Elevation > 2500).count()
Out[26]:
17204
In [27]:
# Comprobando ausencia de valores en latitud y longitud
print(dfe.columns)
print(dtf1.columns)

print("Nulls in dfe")
dfe.select([count(when(isnan(c), c)).alias(c) for c in ['Latitude', 'Longitude']]).show()
print("Nulls in dtf1")
dtf1.select([count(when(isnan(c), c)).alias(c) for c in ['Latitude', 'Longitude']]).show()
['Latitude', 'Longitude', 'Elevation']
['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'City', 'Country', 'Latitude', 'Longitude']
Nulls in dfe
+--------+---------+
|Latitude|Longitude|
+--------+---------+
|       0|        0|
+--------+---------+

Nulls in dtf1
+--------+---------+
|Latitude|Longitude|
+--------+---------+
|       0|        0|
+--------+---------+

In [28]:
dtf1.printSchema()
root
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

In [29]:
dfe.printSchema()
root
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)

In [30]:
# Comprobando que latitud y longitud sean consistentes en ambos datasets
dtf1.select("Latitude", "Longitude").exceptAll(dfe.select("Latitude", "Longitude")).count()
Out[30]:
0
In [31]:
dtf1.select([count(when(isnan(c), c)).alias(c) for c in ['AverageTemperature','AverageTemperatureUncertainty']]).show()
+------------------+-----------------------------+
|AverageTemperature|AverageTemperatureUncertainty|
+------------------+-----------------------------+
|                 0|                            0|
+------------------+-----------------------------+

In [32]:
# ocurren algunos errores al unir los dataframes con join por columnas 
# latitud y longitud, entonces se crea una columna nueva id para 
# evitar errores al mezclar los dataframes.
In [33]:
from pyspark.sql import functions as F
from pyspark.sql import Window
# Agregando una columna indice para obtener por rangos
window = Window.orderBy(F.col('Latitude'), F.col('Longitude') )
dfe = dfe.withColumn('id', F.row_number().over(window))
dfe.show(10)
print(dfe.agg({"id": "max"}).collect()[0])
#dfeLen = dfe.count()
#print("count", dfeLen)
# window = Window.orderBy(F.col('Latitude'), F.col('Longitude'))
dtf1 = dtf1.withColumn('id', F.row_number().over(window))
dtf1.show(10)
print(dtf1.agg({"id": "max"}).collect()[0])
#dtf1Len = dtf1.count()
#print("count", dtf1Len)
+--------+---------+---------+---+
|Latitude|Longitude|Elevation| id|
+--------+---------+---------+---+
|   0.80N|  118.13E|        0|  1|
|   0.80N|  118.13E|        0|  2|
|   0.80N|  118.13E|        0|  3|
|   0.80N|  118.13E|        0|  4|
|   0.80N|  118.13E|        0|  5|
|   0.80N|  118.13E|        0|  6|
|   0.80N|  118.13E|        0|  7|
|   0.80N|  118.13E|        0|  8|
|   0.80N|  118.13E|        0|  9|
|   0.80N|  118.13E|        0| 10|
+--------+---------+---------+---+
only showing top 10 rows

Row(max(id)=1999999)
+-------------------+------------------+-----------------------------+-------+---------+--------+---------+---+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|   City|  Country|Latitude|Longitude| id|
+-------------------+------------------+-----------------------------+-------+---------+--------+---------+---+
|1825-01-01 00:00:00|            26.765|                        1.984|Bontang|Indonesia|   0.80N|  118.13E|  1|
|1825-02-01 00:00:00|            26.097|                        1.409|Bontang|Indonesia|   0.80N|  118.13E|  2|
|1825-03-01 00:00:00|            26.476|                        1.879|Bontang|Indonesia|   0.80N|  118.13E|  3|
|1825-04-01 00:00:00|             27.33|                        2.024|Bontang|Indonesia|   0.80N|  118.13E|  4|
|1825-05-01 00:00:00|26.764000000000006|                        1.155|Bontang|Indonesia|   0.80N|  118.13E|  5|
|1825-06-01 00:00:00|            27.403|                        1.135|Bontang|Indonesia|   0.80N|  118.13E|  6|
|1825-07-01 00:00:00|            26.775|                        2.019|Bontang|Indonesia|   0.80N|  118.13E|  7|
|1825-08-01 00:00:00|26.241999999999997|                        1.963|Bontang|Indonesia|   0.80N|  118.13E|  8|
|1825-09-01 00:00:00|            25.963|           1.5519999999999998|Bontang|Indonesia|   0.80N|  118.13E|  9|
|1825-10-01 00:00:00|25.639000000000006|           1.6669999999999998|Bontang|Indonesia|   0.80N|  118.13E| 10|
+-------------------+------------------+-----------------------------+-------+---------+--------+---------+---+
only showing top 10 rows

Row(max(id)=1999999)
In [34]:
ndf = dtf1.join(dfe, on=["id"], how="inner")
ndf.show()
+---+-------------------+------------------+-----------------------------+-------+---------+--------+---------+--------+---------+---------+
| id|                 dt|AverageTemperature|AverageTemperatureUncertainty|   City|  Country|Latitude|Longitude|Latitude|Longitude|Elevation|
+---+-------------------+------------------+-----------------------------+-------+---------+--------+---------+--------+---------+---------+
|  1|1825-01-01 00:00:00|            26.765|                        1.984|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  2|1825-02-01 00:00:00|            26.097|                        1.409|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  3|1825-03-01 00:00:00|            26.476|                        1.879|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  4|1825-04-01 00:00:00|             27.33|                        2.024|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  5|1825-05-01 00:00:00|26.764000000000006|                        1.155|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  6|1825-06-01 00:00:00|            27.403|                        1.135|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  7|1825-07-01 00:00:00|            26.775|                        2.019|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  8|1825-08-01 00:00:00|26.241999999999997|                        1.963|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  9|1825-09-01 00:00:00|            25.963|           1.5519999999999998|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
| 10|1825-10-01 00:00:00|25.639000000000006|           1.6669999999999998|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
| 11|1825-11-01 00:00:00|            26.967|           1.7169999999999999|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
| 12|1825-12-01 00:00:00|26.055999999999997|                         1.34|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
| 13|1826-01-01 00:00:00|              null|                         null|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
| 14|1826-02-01 00:00:00|              null|                         null|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
| 15|1826-03-01 00:00:00|              null|                         null|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
| 16|1826-04-01 00:00:00|              null|                         null|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
| 17|1826-05-01 00:00:00|              null|                         null|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
| 18|1826-06-01 00:00:00|              null|                         null|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
| 19|1826-07-01 00:00:00|              null|                         null|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
| 20|1826-08-01 00:00:00|              null|                         null|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
+---+-------------------+------------------+-----------------------------+-------+---------+--------+---------+--------+---------+---------+
only showing top 20 rows

In [35]:
print(ndf.count())
1999999
In [36]:
ndf.columns
Out[36]:
['id',
 'dt',
 'AverageTemperature',
 'AverageTemperatureUncertainty',
 'City',
 'Country',
 'Latitude',
 'Longitude',
 'Latitude',
 'Longitude',
 'Elevation']

Tratamiento de ausencia de valores de temperatura

Para la columna AverageTemperature (temperatura promedio por día) haciendo una generalización y para evitar la estimaciones erróneas se ha optado por eliminar las filas con temperatura promedio sin valor. Esto por que la ausencia de valores se parece dar en una proporción pequeña de casos menor al 5%.

Por otro no es seguro intentar estimar o rellenar la ausencia de este dato reemplazando por un valor promedio ya que esto implicaría ver los casos particulares por ciudades y si existe una minoría de registros que no tengan el dato de temperatura promedio y esto podría representar mas trabajo a detalle que podría no tener relevancia en el resumen final de datos.

In [37]:
ndf.filter(ndf.AverageTemperature.isNull()).count()
Out[37]:
92521
In [38]:
# Eliminando las filas con dato de temperatura promedio 
ndf = ndf.filter(ndf.AverageTemperature.isNotNull())
In [39]:
ndf.show(5)
print('count:', ndf.count())
+---+-------------------+------------------+-----------------------------+-------+---------+--------+---------+--------+---------+---------+
| id|                 dt|AverageTemperature|AverageTemperatureUncertainty|   City|  Country|Latitude|Longitude|Latitude|Longitude|Elevation|
+---+-------------------+------------------+-----------------------------+-------+---------+--------+---------+--------+---------+---------+
|  1|1825-01-01 00:00:00|            26.765|                        1.984|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  2|1825-02-01 00:00:00|            26.097|                        1.409|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  3|1825-03-01 00:00:00|            26.476|                        1.879|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  4|1825-04-01 00:00:00|             27.33|                        2.024|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  5|1825-05-01 00:00:00|26.764000000000006|                        1.155|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
+---+-------------------+------------------+-----------------------------+-------+---------+--------+---------+--------+---------+---------+
only showing top 5 rows

count: 1907478

La columna AverageTemperatureUncertainty indica un valor de incertidumbre para la temperatura promedio con un intervalo de confianza de 95%, en los casos en que no se tiene este valor en un registro (columna) se lo rellena con la mediana global a modo de generalizar el error.

In [40]:
del dtf1
del dfe
del dtf2
In [41]:
# from pyspark.sql.functions import approxQuantile
med = ndf.approxQuantile("AverageTemperatureUncertainty", [0.5], 0.25)
print(med)
AverageTemperatureUncertainty_median = med[0]
[0.61]
In [42]:
# reemplazando ausencia de valores con la mediana
replaceNullWithMedian_udf = udf(lambda temp: AverageTemperatureUncertainty_median if temp is None else temp, DoubleType())
ndf = ndf.withColumn("AverageTemperatureUncertainty", replaceNullWithMedian_udf(ndf['AverageTemperatureUncertainty']))
ndf.show(5)
ndf.count()
#ndf.filter(ndf.AverageTemperatureUncertainty.isNull()).count()
+---+-------------------+------------------+-----------------------------+-------+---------+--------+---------+--------+---------+---------+
| id|                 dt|AverageTemperature|AverageTemperatureUncertainty|   City|  Country|Latitude|Longitude|Latitude|Longitude|Elevation|
+---+-------------------+------------------+-----------------------------+-------+---------+--------+---------+--------+---------+---------+
|  1|1825-01-01 00:00:00|            26.765|                        1.984|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  2|1825-02-01 00:00:00|            26.097|                        1.409|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  3|1825-03-01 00:00:00|            26.476|                        1.879|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  4|1825-04-01 00:00:00|             27.33|                        2.024|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
|  5|1825-05-01 00:00:00|26.764000000000006|                        1.155|Bontang|Indonesia|   0.80N|  118.13E|   0.80N|  118.13E|        0|
+---+-------------------+------------------+-----------------------------+-------+---------+--------+---------+--------+---------+---------+
only showing top 5 rows

Out[42]:
1907478
In [43]:
ndf.printSchema()
root
 |-- id: integer (nullable = true)
 |-- dt: string (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)

Gráficas

En este punto el dataframe ndf ya está listo para hacer gráficas y hacer el análisis estadístico.

In [44]:
ndf_pd = ndf.toPandas()['AverageTemperature']
In [45]:
import plotly
import plotly.graph_objs as go
import pandas as pd
import numpy as np
from plotly.offline import init_notebook_mode, iplot

init_notebook_mode(connected=True)         # initiate notebook for offline plot

Distribución de temperatura promedio

In [46]:
data = [go.Histogram(x=ndf_pd)]

plotly.offline.iplot({
    "data": [go.Histogram(x = ndf_pd)],
    "layout": go.Layout(title="Distribución de temperatura Promedio")
})